TimeSeriesSVRTslearn (tslearn-style) — Support Vector Regression for time series

TimeSeriesSVRTslearn (tslearn-style) — Support Vector Regression for time series#

This notebook implements a tslearn-inspired time-series SVR wrapper:

  • TimeSeriesSVRTslearn(C=..., kernel=..., degree=..., ...)

  • supports standard SVR kernels on flattened windows (linear/poly/rbf/sigmoid)

  • adds a time-series-specific option: DTW-RBF kernel (kernel="dtw_rbf")

The goal is to have a runnable reference even without tslearn installed.

SVR in one page (kernel view)#

Given training pairs \((x_i, y_i)\), SVR learns a function \(f(x)\) that is flat (small norm) while keeping most errors within an \(\varepsilon\)-tube.

In the kernelized form, predictions are: $\(\hat{y}(x) = \sum_{i=1}^{n} (\alpha_i - \alpha_i^*)\,K(x, x_i) + b\)\( where \)K(x,x’) = \langle \Phi(x), \Phi(x’)\rangle$ is a kernel.

Why special kernels for time series?#

Flattening a window treats misalignments as large errors. For sequences that may be locally shifted/stretched, an alignment distance (DTW) can be more faithful.

We implement a simple DTW-RBF kernel: $\(K_{\text{DTW-RBF}}(x,x') = \exp(-\gamma\,\mathrm{DTW}(x,x')^2).\)$

Note: an RBF of DTW is not guaranteed PSD for all settings; empirically it often works as a similarity for SVR.

import numpy as np
import pandas as pd

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio

from sklearn.svm import SVR
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

from scipy import stats

pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"

rng = np.random.default_rng(7)

import numpy, pandas, sklearn, scipy, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("scipy:", scipy.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
scipy: 1.15.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
    """Accept (n, m) or (n, d, m). Return (n, d, m)."""
    X = np.asarray(X, dtype=float)
    if X.ndim == 2:
        return X[:, None, :]
    if X.ndim == 3:
        return X
    raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")


def _flatten_panel(X3: np.ndarray) -> np.ndarray:
    n, d, m = X3.shape
    return X3.reshape(n, d * m)


def dtw_cost(a: np.ndarray, b: np.ndarray, *, window: int | None = None) -> float:
    """DTW squared cost between sequences a and b.

    a, b: shape (m, d) (time-major).
    window: Sakoe-Chiba radius in time steps (None -> unconstrained).
    """
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    if a.ndim == 1:
        a = a.reshape(-1, 1)
    if b.ndim == 1:
        b = b.reshape(-1, 1)

    n, da = a.shape
    m, db = b.shape
    if da != db:
        raise ValueError("a and b must have the same dimensionality")

    if window is None:
        window = max(n, m)
    window = int(max(window, abs(n - m)))

    prev = np.full(m + 1, np.inf)
    curr = np.full(m + 1, np.inf)
    prev[0] = 0.0

    for i in range(1, n + 1):
        curr.fill(np.inf)
        j_start = max(1, i - window)
        j_end = min(m, i + window)
        ai = a[i - 1]
        for j in range(j_start, j_end + 1):
            diff = ai - b[j - 1]
            cost = float(np.dot(diff, diff))
            curr[j] = cost + min(prev[j], curr[j - 1], prev[j - 1])
        prev, curr = curr, prev

    return float(prev[m])


def pairwise_dtw_cost(
    X: np.ndarray,
    Y: np.ndarray | None = None,
    *,
    window: int | None = None,
) -> np.ndarray:
    """Pairwise DTW squared costs.

    X: (n, m) or (n, d, m)
    Y: (k, m) or (k, d, m)
    Returns: (n, k)
    """
    X3 = _as_3d_panel(X)
    Y3 = X3 if Y is None else _as_3d_panel(Y)

    n, d, m = X3.shape
    k = Y3.shape[0]

    out = np.empty((n, k), dtype=float)
    for i in range(n):
        a = X3[i].T
        for j in range(k):
            b = Y3[j].T
            out[i, j] = dtw_cost(a, b, window=window)
    return out


def dtw_rbf_kernel(
    X: np.ndarray,
    Y: np.ndarray | None = None,
    *,
    gamma: float,
    window: int | None = None,
) -> np.ndarray:
    D = pairwise_dtw_cost(X, Y, window=window)
    return np.exp(-float(gamma) * D)


def _acf(x: np.ndarray, max_lag: int) -> tuple[np.ndarray, np.ndarray]:
    x = np.asarray(x, dtype=float)
    x = x - x.mean()
    denom = float(np.dot(x, x))
    lags = np.arange(max_lag + 1)
    values = np.zeros(max_lag + 1)
    values[0] = 1.0
    if denom == 0.0:
        return lags, values
    for k in range(1, max_lag + 1):
        values[k] = float(np.dot(x[k:], x[:-k]) / denom)
    return lags, values
class TimeSeriesSVRTslearn:
    """tslearn-style time-series SVR wrapper.

    - For kernel in {linear, poly, rbf, sigmoid}: uses sklearn SVR on flattened windows.
    - For kernel='dtw_rbf': uses sklearn SVR with kernel='precomputed' on a DTW-RBF similarity.
    """

    def __init__(
        self,
        *,
        C: float = 1.0,
        kernel: str = "rbf",
        degree: int = 3,
        gamma: float | str | None = "scale",
        coef0: float = 0.0,
        epsilon: float = 0.1,
        tol: float = 1e-3,
        max_iter: int = -1,
        dtw_window: int | None = None,
        dtw_gamma: float | None = None,
    ):
        self.C = float(C)
        self.kernel = str(kernel)
        self.degree = int(degree)
        self.gamma = gamma
        self.coef0 = float(coef0)
        self.epsilon = float(epsilon)
        self.tol = float(tol)
        self.max_iter = int(max_iter)
        self.dtw_window = dtw_window
        self.dtw_gamma = dtw_gamma

        self.model_: SVR | None = None
        self.X_train_: np.ndarray | None = None
        self.n_timepoints_: int | None = None
        self.n_dims_: int | None = None
        self.dtw_gamma_: float | None = None

    def fit(self, X, y):
        X3 = _as_3d_panel(X)
        y = np.asarray(y, dtype=float)
        if X3.shape[0] != y.shape[0]:
            raise ValueError("X and y must have the same number of samples")

        n, d, m = X3.shape
        self.n_timepoints_ = int(m)
        self.n_dims_ = int(d)

        if self.kernel == "dtw_rbf":
            self.X_train_ = X3
            if self.dtw_gamma is None:
                # Median heuristic on a small subset for speed
                idx = np.arange(n)
                if n > 60:
                    idx = np.random.default_rng(0).choice(n, size=60, replace=False)
                D = pairwise_dtw_cost(X3[idx], window=self.dtw_window)
                v = D[np.triu_indices_from(D, k=1)]
                v = v[v > 0]
                med = float(np.median(v)) if v.size else 1.0
                self.dtw_gamma_ = 1.0 / med
            else:
                self.dtw_gamma_ = float(self.dtw_gamma)

            K = dtw_rbf_kernel(X3, gamma=self.dtw_gamma_, window=self.dtw_window)
            self.model_ = SVR(
                kernel="precomputed",
                C=self.C,
                epsilon=self.epsilon,
                tol=self.tol,
                max_iter=self.max_iter,
            )
            self.model_.fit(K, y)
            return self

        X_flat = _flatten_panel(X3)
        self.model_ = SVR(
            kernel=self.kernel,
            C=self.C,
            degree=self.degree,
            gamma=self.gamma,
            coef0=self.coef0,
            epsilon=self.epsilon,
            tol=self.tol,
            max_iter=self.max_iter,
        )
        self.model_.fit(X_flat, y)
        return self

    def predict(self, X) -> np.ndarray:
        if self.model_ is None:
            raise RuntimeError("Call fit() before predict().")

        X3 = _as_3d_panel(X)
        n, d, m = X3.shape
        if m != self.n_timepoints_ or d != self.n_dims_:
            raise ValueError(
                f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_})), got {X3.shape}"
            )

        if self.kernel == "dtw_rbf":
            if self.X_train_ is None or self.dtw_gamma_ is None:
                raise RuntimeError("Model is missing DTW state; refit.")
            K = dtw_rbf_kernel(X3, self.X_train_, gamma=self.dtw_gamma_, window=self.dtw_window)
            return self.model_.predict(K)

        X_flat = _flatten_panel(X3)
        return self.model_.predict(X_flat)

Demo: one-step forecasting via sliding windows#

We build a regression dataset from a single multi-seasonal series:

  • input \(X_t = (y_{t-L},\dots,y_{t-1})\)

  • target \(y_t\)

Then we compare:

  • SVR with a DTW-RBF kernel (kernel="dtw_rbf") and

  • SVR with a standard RBF kernel on flattened windows.

def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
    eps = rng.normal(0.0, sigma, size=n)
    u = np.zeros(n)
    for t in range(1, n):
        u[t] = phi * u[t - 1] + eps[t]
    return u


def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
    y = np.asarray(y, dtype=float)
    L = int(window_length)
    if y.size <= L:
        raise ValueError("y is too short")
    X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
    y_next = y[L:]
    return X, y_next


n = 420
idx = pd.date_range("2022-01-01", periods=n, freq="D")
t = np.arange(n)

weekly = 1.6 * np.sin(2 * np.pi * t / 7) + 0.4 * np.cos(2 * np.pi * t / 7)
monthly = 1.0 * np.sin(2 * np.pi * t / 30) - 0.3 * np.cos(2 * np.pi * t / 30)
trend = 0.03 * t
noise = simulate_ar1_noise(n, phi=0.6, sigma=0.6, rng=rng)

y = 20.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")

fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic multi-seasonal series", xaxis_title="date", yaxis_title="value")
fig.show()
L = 35
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)

h = 80
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]
t_test = y.index[-h:]

# Keep DTW demo small enough to run quickly
n_train_max = 140
if X_train.shape[0] > n_train_max:
    X_train_small = X_train[-n_train_max:]
    y_train_small = y_train[-n_train_max:]
else:
    X_train_small = X_train
    y_train_small = y_train

svr_dtw = TimeSeriesSVRTslearn(
    C=10.0,
    kernel="dtw_rbf",
    epsilon=0.2,
    dtw_window=10,
    dtw_gamma=None,
).fit(X_train_small, y_train_small)

pred_dtw = svr_dtw.predict(X_test)

svr_rbf = TimeSeriesSVRTslearn(
    C=10.0,
    kernel="rbf",
    gamma="scale",
    epsilon=0.2,
).fit(X_train, y_train)

pred_rbf = svr_rbf.predict(X_test)

def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    r2 = r2_score(y_true, y_pred)
    print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")


summarize("SVR(DTW-RBF)", y_test, pred_dtw)
summarize("SVR(RBF on lags)", y_test, pred_rbf)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[5], line 44
     40     r2 = r2_score(y_true, y_pred)
     41     print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
---> 44 summarize("SVR(DTW-RBF)", y_test, pred_dtw)
     45 summarize("SVR(RBF on lags)", y_test, pred_rbf)

Cell In[5], line 39, in summarize(name, y_true, y_pred)
     37 def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
     38     mae = mean_absolute_error(y_true, y_pred)
---> 39     rmse = mean_squared_error(y_true, y_pred, squared=False)
     40     r2 = r2_score(y_true, y_pred)
     41     print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")

File ~/miniconda3/lib/python3.12/site-packages/sklearn/utils/_param_validation.py:194, in validate_params.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    191 func_sig = signature(func)
    193 # Map *args/**kwargs to the function signature
--> 194 params = func_sig.bind(*args, **kwargs)
    195 params.apply_defaults()
    197 # ignore self/cls and positional/keyword markers

File ~/miniconda3/lib/python3.12/inspect.py:3277, in Signature.bind(self, *args, **kwargs)
   3272 def bind(self, /, *args, **kwargs):
   3273     """Get a BoundArguments object, that maps the passed `args`
   3274     and `kwargs` to the function's signature.  Raises `TypeError`
   3275     if the passed arguments can not be bound.
   3276     """
-> 3277     return self._bind(args, kwargs)

File ~/miniconda3/lib/python3.12/inspect.py:3266, in Signature._bind(self, args, kwargs, partial)
   3256         raise TypeError(
   3257             'got some positional-only arguments passed as '
   3258             'keyword arguments: {arg!r}'.format(
   (...)   3263             ),
   3264         )
   3265     else:
-> 3266         raise TypeError(
   3267             'got an unexpected keyword argument {arg!r}'.format(
   3268                 arg=next(iter(kwargs))))
   3270 return self._bound_arguments_cls(self, arguments)

TypeError: got an unexpected keyword argument 'squared'
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=pred_dtw, name="SVR(DTW-RBF)", line=dict(color="#4E79A7")))
fig.add_trace(go.Scatter(x=t_test, y=pred_rbf, name="SVR(RBF)", line=dict(color="#E15759")))
fig.update_layout(title="One-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()
# Residual diagnostics (DTW model)
resid = y_test - pred_dtw

print("residual mean:", float(np.mean(resid)))
print("residual std:", float(np.std(resid, ddof=1)))
print("Jarque-Bera:", stats.jarque_bera(resid))

lags, acf_vals = _acf(resid, max_lag=30)
bound = 1.96 / np.sqrt(resid.size)

# QQ data
nq = resid.size
p = (np.arange(1, nq + 1) - 0.5) / nq
theoretical = stats.norm.ppf(p)
sample_q = np.sort((resid - resid.mean()) / resid.std(ddof=1))

fig = make_subplots(
    rows=2,
    cols=2,
    subplot_titles=("Residuals over time", "Residual histogram", "Residual ACF", "QQ plot (std residuals)"),
)

fig.add_trace(go.Scatter(x=t_test, y=resid, name="residuals", line=dict(color="#4E79A7")), row=1, col=1)
fig.add_hline(y=0, line=dict(color="black", dash="dash"), row=1, col=1)

fig.add_trace(go.Histogram(x=resid, nbinsx=25, name="hist", marker_color="#4E79A7"), row=1, col=2)

fig.add_trace(go.Bar(x=lags, y=acf_vals, name="ACF(resid)", marker_color="#4E79A7"), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[bound, bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[-bound, -bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)

fig.add_trace(go.Scatter(x=theoretical, y=sample_q, mode="markers", name="QQ", marker=dict(color="#4E79A7")), row=2, col=2)
fig.add_trace(
    go.Scatter(x=[theoretical.min(), theoretical.max()], y=[theoretical.min(), theoretical.max()], mode="lines", line=dict(color="black", dash="dash"), showlegend=False),
    row=2,
    col=2,
)

fig.update_layout(height=750, title="Residual diagnostics (SVR DTW-RBF)")
fig.show()
residual mean: -0.24839058285380267
residual std: 3.4313360450269337
Jarque-Bera: SignificanceResult(statistic=3.9226931240248826, pvalue=0.14066887396930364)